package com.jokerku.mini.schedule.config;

import com.jokerku.mini.schedule.annotation.MiniScheduled;
import com.jokerku.mini.schedule.common.Constants;
import com.jokerku.mini.schedule.domain.ExecOrder;
import com.jokerku.mini.schedule.domain.ZkPath;
import com.jokerku.mini.schedule.event.InstructEventListener;
import com.jokerku.mini.schedule.event.ServerNodeEventListener;
import com.jokerku.mini.schedule.exception.MiniScheduleException;
import com.jokerku.mini.schedule.leader.LeaderService;
import com.jokerku.mini.schedule.service.HeartbeatService;
import com.jokerku.mini.schedule.service.ZkCuratorServer;
import com.jokerku.mini.schedule.sharding.ShardingService;
import com.jokerku.mini.schedule.task.ScheduleExec;
import com.jokerku.mini.schedule.util.IPUtil;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.util.*;

import static com.jokerku.mini.schedule.common.Constants.Global.*;

/**
 * @Author: guzq
 * @CreateTime: 2023/05/24 15:42
 * @Description: 定时任务总配置类
 * @Version: 1.0
 */
public class MiniScheduleConfiguration implements ApplicationContextAware, DisposableBean, BeanPostProcessor, ApplicationListener<ContextRefreshedEvent> {

    private static final Logger logger = LoggerFactory.getLogger(MiniScheduleConfiguration.class);

    private final Set<Class<?>> resolvedClasses = new HashSet<>(64);

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Constants.Global.applicationContext = applicationContext;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 获取 Class 对象
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        // 已解析不做处理
        if (resolvedClasses.contains(targetClass)) return bean;
        // 获取定时任务方法
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(targetClass);
        for (Method method : methods) {
            MiniScheduled miniScheduled = AnnotationUtils.findAnnotation(method, MiniScheduled.class);
            if (miniScheduled == null) continue;
            if (miniScheduled.shardingTotalCount() < 1) {
                throw new MiniScheduleException(String.format("Param [shardingTotalCount] cannot be less 1 in %s", beanName + HORIZONTAL_LINE + method.getName()));
            }
            List<ExecOrder> execOrders = Constants.EXEC_ORDER_MAP.computeIfAbsent(beanName, key -> new ArrayList<>());
            // 包装成为定时任务
            ExecOrder execOrder = new ExecOrder();
            execOrder.setBean(bean);
            execOrder.setBeanName(beanName);
            execOrder.setMethodName(method.getName());
            execOrder.setCron(miniScheduled.cron());
            execOrder.setAutoStartup(miniScheduled.autoStartUp());
            execOrder.setDesc(miniScheduled.desc());
            execOrder.setShardingTotalCount(miniScheduled.shardingTotalCount());
            execOrder.setJobParameter(miniScheduled.jobParameter());

            execOrders.add(execOrder);
            resolvedClasses.add(targetClass);
        }

        return bean;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        ApplicationContext applicationContext = contextRefreshedEvent.getApplicationContext();
        try {
            // 1.加载配置
            initConfig(applicationContext);
            // 2.初始化节点
            initZK();
            // 3.选举主从节点
            electionLeader();
            // 4.重置任务分片状态
            ShardingService.getInstance().setReShardingFlag();
            // 5.启动定时任务 挂载节点
            ScheduleExec.getInstance().initTask().initNode();
            // 6.心跳检测
            HeartbeatService.getInstance().startRefreshSchedule();
            logger.info("Mini schedule {} start success", ip);
        } catch (Exception e) {
            logger.error("mini schedule init error", e);
            throw new RuntimeException(e);
        }

    }

    private void electionLeader() throws Exception {
        leaderService = new LeaderService(client, ZkPath.getMasterPath());
        leaderService.selectLeader();
    }

    private void initConfig(ApplicationContext applicationContext) {
        try {
            MiniScheduleConfigProperties properties = applicationContext.getBean(MiniScheduleConfigProperties.class);
            // ip = InetAddress.getLocalHost().getHostAddress();
//            ip = "210.44.206.195";
//        ip = "61.234.165.60";
//        ip = "31.541.571.401";
            ip = IPUtil.getRandomIp();
            scheduleServerId = properties.getScheduleServerId();
            scheduleServerName = properties.getScheduleServerName();
            zkAddress = properties.getZkAddress();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void initZK() {
        try {
            // 获取客户端
            CuratorFramework client = ZkCuratorServer.getClient(zkAddress);
            // 创建节点
            ZkCuratorServer.create(client, ZkPath.getBasePath());
            ZkCuratorServer.setData(client, ZkPath.getBasePath(), scheduleServerName); // /mini/schedule/server/mini-schedule-spring-boot-starter-test -> 分布式任务测试

            ZkCuratorServer.create(client, ZkPath.getServerTaskPath());
            ZkCuratorServer.setData(client, ZkPath.getServerTaskPath(), ip); // /mini/schedule/server/mini-schedule-spring-boot-starter-test/tasks -> xxx.xxx.xxx

            // 在 ip 目录新增当前 ip 临时节点
            ZkCuratorServer.createEphemeralNode(client, ZkPath.getServerRunningIpPath(), ip); // /mini/schedule/server/mini-schedule-spring-boot-starter-test/ip/100.67.129.xx
            // 监听 ip 目录
            ZkCuratorServer.addNodeCacheListener(client, ZkPath.buildServerRunningIpPath(scheduleServerId), new ServerNodeEventListener());

            // 添加后台操作事件节点&监听
            ZkCuratorServer.createNodeSimple(client, ZkPath.geExecPath()); // /mini/schedule/server/mini-schedule-spring-boot-starter-test/exec
            ZkCuratorServer.addTreeCacheListener(client, ZkPath.geExecPath(), new InstructEventListener());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void destroy() {
        if (null != client) {
            client.close();
        }
    }
}
